package main

import (
	"context"
	"fmt"
	"gitee.com/zackeus/go-boot/rocketmq"
	"gitee.com/zackeus/go-boot/rocketmq/consumer"
	"gitee.com/zackeus/go-boot/rocketmq/primitive"
	"gitee.com/zackeus/go-boot/rocketmq/rlog"
	"gitee.com/zackeus/go-zero/core/logx"
	"os"
)

const (
	Topic         = "test-topic"
	ConsumerGroup = "test-group"
	Endpoint      = "http://rmq.nameserver1.server:9876"
	AccessKey     = "testuser"
	SecretKey     = "yulon123"
)

func main() {
	if err := logx.SetUp(logx.LogConf{Mode: "console", Encoding: "plain"}); err != nil {
		fmt.Println(err)
		return
	}
	rlog.SetLevel("error")

	sig := make(chan os.Signal)
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName(ConsumerGroup),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{Endpoint})),
		consumer.WithConsumeMessageBatchMaxSize(1),
		/* 广播 */
		consumer.WithConsumerModel(consumer.BroadCasting),
		/* 消费起始位置 */
		consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset),
		/* 鉴权消息 */
		consumer.WithCredentials(primitive.Credentials{
			AccessKey: AccessKey,
			SecretKey: SecretKey,
		}),
	)

	if err := c.Subscribe(Topic, consumer.MessageSelector{}, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for i := range ext {
			fmt.Printf("subscribe callback: %v \n", ext[i])
		}
		return consumer.ConsumeSuccess, nil
	}); err != nil {
		fmt.Println(err.Error())
	}

	// Note: start after subscribe
	if err := c.Start(); err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}

	<-sig
	if err := c.Shutdown(); err != nil {
		fmt.Printf("shutdown Consumer error: %s", err.Error())
	}
}
